Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50639][SQL] Improve warning logging in CacheManager #49276

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

vrozov
Copy link
Member

@vrozov vrozov commented Dec 23, 2024

What changes were proposed in this pull request?

The change improves warning logging in the CacheManager by:

  1. Adds logical plan info to the existing warning messages.
  2. Logs warning message in case an attempt is made to remove data from the cache, but data is not present.

Why are the changes needed?

The change helps to identify incorrect calls to Dataset.persist() and Dataset.unpersist() as in

Dataset<Row> dataset = ...
Dataset<Row> dataset1 = dataset.withColumn(...);
Dataset<Row> dataset2 = dataset1.withColumn(...);
dataset.persist(); // OK
dataset1.persist(); // OK
dataset.persist(); // currently logs warning without logical plan details
dataset.unpersist(); // OK
dataset.unpersist(); // no warning
dataset2.unpersist(); // no warning, the actual call should be on dataset1

Does this PR introduce any user-facing change?

Users may see warning messages like:

23.12.2024 19:15:03.840 WARN  [pool-30-thread-1] org.apache.spark.sql.execution.CacheManager - An attempt was made to cache data even though the data had already been cached. Please un-cache data or clear cache first.
Logical plan:
Relation [i#0] JDBCRelation(test_table) [numPartitions=1]

and

23.12.2024 19:15:04.207 WARN  [pool-30-thread-1] org.apache.spark.sql.execution.CacheManager - Data has not been previously cached or it was removed from the cache already.
Logical plan:
Project [i#0, i#0 AS year#6]
+- Relation [i#0] JDBCRelation(test_table) [numPartitions=1]

How was this patch tested?

The change modifies warning log messages.

Was this patch authored or co-authored using generative AI tooling?

No.

@vrozov
Copy link
Member Author

vrozov commented Dec 31, 2024

@hvanhovell please review

@vrozov
Copy link
Member Author

vrozov commented Jan 7, 2025

@gengliangwang can you take a look

val shouldRemove: LogicalPlan => Boolean =
if (cascade) {
_.exists(isMatchedPlan)
} else {
isMatchedPlan
}
val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
var plansToUncache: IndexedSeq[CachedData] = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vrozov why do we need the code change here if this PR is to improve the logging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang logging relies on the plansToUncache.nonEmpty added on line 288 to correctly log warnings, so the change is necessary to prevent race condition (cacheData should be accessed under synchronized).

@vrozov
Copy link
Member Author

vrozov commented Jan 10, 2025

@gengliangwang please check my reply

@gengliangwang
Copy link
Member

@vrozov sorry for the late reply.
Before we move forward, I think the changes in this PR are already covered in #45990. If you set spark.sql.dataframeCache.logLevel as WARN, you will see similar logs for cache/uncache.
cc @anchovYu

@vrozov
Copy link
Member Author

vrozov commented Jan 10, 2025

Change in #45990 provides troubleshooting options for the CacheManager and is useful for debugging memory leaks in the CacheManager. Problem with it is that it is not enabled by default (default is TRACE) and produces large amount of logging. This PR will enable early warning notifications and users can further troubleshoot CacheManager issues by setting spark.sql.dataframeCache.logLevel.

@vrozov
Copy link
Member Author

vrozov commented Jan 14, 2025

@gengliangwang, @anchovYu please check my reply.

@gengliangwang
Copy link
Member

gengliangwang commented Jan 14, 2025

@vrozov the changes of this PR overlaps with the PR #45990. How about we simply change the log level from TRACE to INFO?

@vrozov
Copy link
Member Author

vrozov commented Jan 14, 2025

@gengliangwang My understanding is that log level was set to TRACE intentionally and it should not be enabled by default. Please see comment on #45990

Because every query applies cache, this log could be huge and should be only turned on during some debugging process, and should not enabled by default in production.

Note that warnings on line 129 and 145 coexist with changes from #45990 and provide early problem notification.

This PR originates from a real issue where I spent large amount of time first isolating memory leak to the CacheManager and then debugging it to the unpersist() call on a wrong data set. Should the warning be present in the first place, it would help to identify the problem much easier.

@gengliangwang
Copy link
Member

Note that warnings on line 129 and 145 coexist with changes from #45990 and provide early problem notification.

Can you provide more details. I think they are similar. Please check the method calls of CacheManager.logCacheOperation.

@vrozov
Copy link
Member Author

vrozov commented Jan 15, 2025

@gengliangwang Please check lines 129 and lines 145. Those are pre-existing warnings (existed prior to #45990 and were not removed as part of #45990) and they use logWarning(), not CacheManager.logCacheOperation() (that depends on spark.sql.dataframeCache.logLevel settings and are off by default). I think, this is the right approach as changes in #45990 provide means to further troubleshoot CacheManager (without code debugging), and warnings in this PR provides early problem notification.

I think they are similar.

They are related, but serve different purpose. Entries that use logWarning() are enabled by default (including production), while logCacheOperation() are disabled by default and should only be enabled to debug caching issues.

@vrozov
Copy link
Member Author

vrozov commented Jan 16, 2025

@gengliangwang Please check my reply.

To clarify why I think #45990 and #49276 are related but do not overlap: changes in #45990 log (trace by default) messages when the cache is modified (an item is added or removed from the cache), while changes in #49276 log warning messages when the cache is expected to be modified by a user, but is not modified (an item is not added or is not removed) along with details about Dataset that is used in the call to persist() or unpersist().

@vrozov
Copy link
Member Author

vrozov commented Jan 21, 2025

@gengliangwang Please check my reply.

@@ -126,7 +126,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (storageLevel == StorageLevel.NONE) {
// Do nothing for StorageLevel.NONE since it will not actually cache any data.
} else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Asked to cache already cached data.")
logWarning(log"An attempt was made to cache data even though the data had already been " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vrozov so in the method call of lookupCachedDataInternal, it will output

CacheManager.logCacheOperation(log"Dataframe cache hit for input plan:" +
        log"\n${MDC(QUERY_PLAN, plan)} matched with cache entry:" +
        log"${MDC(DATAFRAME_CACHE_ENTRY, result.get)}")

while in the change here it suggests users to uncache instead of cache is hit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log from #45990 is more accurate, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not. The cache hit is irrelevant in this case as the caller(user) does not expect entry to be present in the cache and the user explicitly calls persist() while cache lookup is an internal operation not initiated by the caller. The warning message in #49276 means that user either does not need to call persist() (and should remove it) or there is missing/wrong call to unpersist().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang Please see my response.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang Please see my response.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vrozov can you explain why we need to uncache data or clear cache? If the plan is already cached, then we don't need to cache it again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang sounds that we are going in a circle here. The warning message is logged when there is a call to persist() when DataSet is already present in the cache, so it is an indication of a bug in the code that calls persist(). Either there is a missing call to unpersist() on the Dataset (there may be a call to unpersist() on another Dataset that was not cached as in the sample I provided in the PR comment) or the call is not necessary at all. The warning message is a way for the user to be notified that there is a problem (bug) in the code and it is up to the user to see how to fix it. If you have better suggestion on the warning message wording, please post it here. As far as I can see, the proposed warning message indicates that

  • the data had already being cached, so the call may not be necessary
  • there may be a missing call to unpersist() or clear cache

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please un-cache data or clear cache first.\nLogical plan:\n"

Asking users to call unpersist is unnecessary and super confusing.

Copy link
Member Author

@vrozov vrozov Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang

  • I disagree that it is super confusing and explained when call to unpersist() may be necessary.
  • To move this PR forward I updated warning message and removed reference to un-cache data.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang This code change is to provide added-value for warning log as developers can easily identify which query_plan was already persisted.

  • Before the change: The warning log only showed Asked to cache already cached data. Developers can not identify which query_plan was already cached from the warning message. For large project, it means the warning does not add value to the user as there might be too many dataframe in the project.

  • After the change: The warning log showed which query_plan was already cached. Then developers can easily check their code to identify the unnecessary cache/persist for specific dataframe.

Comment on lines -209 to +215
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)
if (!uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)) {
logWarning(log"Data has not been previously cached or it was removed from the " +
log"cache already.\nLogical plan:\n${MDC(QUERY_PLAN, plan)}")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang This log is to warn that developers are trying to unpersist a query_plan which has not been previously cached and show the related query plan details.

For example, this sample pyspark code is trying to unpersist a redefined a dataframe. This leavs the query plan of the original cached dataframe in CacheManager. If this happens in for loop or spark structured streaming foreachbatch, the driver memory will constantly increase and lead to memory issue.

    df = spark.createDataFrame(data, ["name", "age", "city"])
    df.persist()
    df.show()
    df = df.withColumn("NAME", upper(col("name")))
    df.show()
    df.unpersist()

The proposed change here is to help developers easily identify they are trying to unpersist a query_plan which has not been previously cached. Then developers can review their code to confirm whether they are unpersisting a wrong dataframe.

Copy link

@yangguoaws yangguoaws left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vrozov @gengliangwang I added comments for two warning log changes and illustrated their added value for developers. Please let me know if any changes needed to move this PR forward.

@vrozov
Copy link
Member Author

vrozov commented Feb 10, 2025

@gengliangwang Please review

1 similar comment
@vrozov
Copy link
Member Author

vrozov commented Feb 14, 2025

@gengliangwang Please review

@@ -126,7 +126,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (storageLevel == StorageLevel.NONE) {
// Do nothing for StorageLevel.NONE since it will not actually cache any data.
} else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Asked to cache already cached data.")
logWarning(log"An attempt was made to cache data even though the data had already been " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is very similar to the changes in https://github.com/apache/spark/pull/45990/files#diff-88635a13b65f19dcc80b865d903b498b8328607f96c088402a8ebdbb857eedf9R303. I don't think we should have two duplicated logs with different log level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang The log entry is not added as part of this PR, so I don't follow your concern.

@vrozov
Copy link
Member Author

vrozov commented Feb 24, 2025

@gengliangwang Please see my response to your comment

@vrozov
Copy link
Member Author

vrozov commented Feb 26, 2025

@gengliangwang ^^^

@vrozov
Copy link
Member Author

vrozov commented Feb 28, 2025

@gengliangwang ?

@vrozov
Copy link
Member Author

vrozov commented Mar 3, 2025

@hvanhovell @dongjoon-hyun Please take a look.

@vrozov
Copy link
Member Author

vrozov commented Mar 6, 2025

@hvanhovell ? @dongjoon-hyun ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants